热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

部将|放入_如何扩展HiveMetastoreThriftRPC服务接口

篇首语:本文由编程笔记#小编为大家整理,主要介绍了如何扩展HiveMetastoreThriftRPC服务接口相关的知识,希望对你有一定的参考价值。Hive在经历

篇首语:本文由编程笔记#小编为大家整理,主要介绍了如何扩展Hive Metastore Thrift RPC服务接口相关的知识,希望对你有一定的参考价值。


Hive 在经历十几年的发展,已经获得广泛应用,随着版本升级,不同版本之间的协议接口会发生一些变化,尽管HMS在尽量保持向前兼容,但在大版本变更时,仍然不能保证完全兼容,比如HMS2到HMS3,有关索引(Index)相关的接口变成了约束(constraint),比如下图中左边是HMS3接口定义,右边是HMS2接口定义:



这样当使用Hive 2的Client去访问HMS 3 的服务时候,就会报无效方法(Invalid Method)异常,此时可以通过一个Hive Metastore代理服务来解决。


在具体介绍Hive Metastore代理服务前,需要了解下Hive Metastore服务处理流程:


HMSHandler baseHandler = new HiveMetaStore.HMSHandler("metaserver", conf,false);
IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf);
TProcessor processor &#61; new ThriftHiveMetastore.Processor<>(handler));
final TThreadPoolServer.Args serverArgs &#61; new TThreadPoolServer
.Args(serverTransport)
.transportFactory(new TTransportFactory())
.protocolFactory(new TBinaryProtocol.Factory())
.processor(processor);
TServer server &#61; new TThreadPoolServer(serverArgs);
server.setServerEventHandler(getServerEventHandler());
server.serve();

上述示例代码启动了Hive Metastore的服务&#xff0c;其中两个重要的组件&#xff1a;


  • TThreadPoolServer&#xff1a;负责请求的监听和派遣

  • TProcessor&#xff1a;负责执行各RPC接口的实现逻辑


TThreadPoolServer功能介绍


TThreadPoolServer主要负责请求的监听和派遣&#xff0c;当接收到新的请求时&#xff0c;在内部将每个client的请求放入Worker线程池&#xff0c;然后继续下一个请求的监听&#xff0c;而工作线程负责一对一处理每个client的请求&#xff0c;直至处理完毕&#xff1a;


#TThreadPoolServer.class
public class TThreadPoolServer extends TServer
//private ExecutorService executorService_;

public void serve()
...
while(!this.stopped_)
TTransport client &#61; this.serverTransport_.accept();
TThreadPoolServer.WorkerProcess wp &#61; new TThreadPoolServer.WorkerProcess(client);
this.executorService_.execute(wp);

...
this.executorService_.shutdown();

...
private class WorkerProcess implements Runnable
//private TTransport client_;
public void run()
do
if (eventHandler !&#61; null)
eventHandler.processContext(connectionContext, inputTransport, outputTransport);

while(!TThreadPoolServer.this.stopped_ && processor.process(inputProtocol, outputProtocol));
if (this.client_.isOpen())
this.client_.close();




TProcessor功能介绍


上面介绍了WorkerProcess的工作功能&#xff0c;TThreadPoolServer将每个客户端连接整体扔给TProcessor&#xff0c;没有介绍在其通过processor.process处理时候&#xff0c;如何定位每个不同的方法&#xff0c;实际上区分不同方法调用的逻辑同时也是TProcessor负责的&#xff0c;以Processor为例&#xff0c;定义了所支持的接口映射&#xff1a;


#ThriftHiveMetastore.class
public static class Processor extends com.facebook.fb303.FacebookService.Processor implements TProcessor
private static final Logger LOGGER &#61; LoggerFactory.getLogger(ThriftHiveMetastore.Processor.class.getName());
public Processor(I iface)
super(iface, getProcessMap(new HashMap()));

//Note:processMap
protected Processor(I iface, Map> processMap)
super(iface, getProcessMap(processMap));

private static Map> getProcessMap(Map> processMap)
processMap.put("create_catalog", new ThriftHiveMetastore.Processor.create_catalog());
processMap.put("alter_catalog", new ThriftHiveMetastore.Processor.alter_catalog());
processMap.put("get_catalog", new ThriftHiveMetastore.Processor.get_catalog());
processMap.put("get_catalogs", new ThriftHiveMetastore.Processor.get_catalogs());
processMap.put("drop_catalog", new ThriftHiveMetastore.Processor.drop_catalog());
processMap.put("create_database", new ThriftHiveMetastore.Processor.create_database());
processMap.put("get_database", new ThriftHiveMetastore.Processor.get_database());
processMap.put("drop_database", new ThriftHiveMetastore.Processor.drop_database());
...


也就是凡是登记在processMap的方法才能被识别&#xff0c;否则都会报异常&#xff0c;这个逻辑可以在Processor的父类TBaseProcessor中得知&#xff1a;


public abstract class TBaseProcessor implements TProcessor
private final I iface;
private final Map> processMap;
protected TBaseProcessor(I iface, Map> processFunctionMap)
this.iface &#61; iface;
this.processMap &#61; processFunctionMap;

public Map> getProcessMapView()
return Collections.unmodifiableMap(this.processMap);


public boolean process(TProtocol in, TProtocol out) throws TException
TMessage msg &#61; in.readMessageBegin();
ProcessFunction fn &#61; (ProcessFunction)this.processMap.get(msg.name);
if (fn &#61;&#61; null)
TProtocolUtil.skip(in, (byte)12);
in.readMessageEnd();
TApplicationException x &#61; new TApplicationException(1, "Invalid method name: &#39;" &#43; msg.name &#43; "&#39;");
out.writeMessageBegin(new TMessage(msg.name, (byte)3, msg.seqid));
x.write(out);
out.writeMessageEnd();
out.getTransport().flush();
return true;
else
fn.process(msg.seqid, in, out, this.iface);
return true;


在process方法中&#xff0c;根据客户端调用传来的方法名称去查找processMap方法映射&#xff0c;如果找不到就报异常。如何添加非注册方法呢&#xff1f;观察Processor的构造方法&#xff1a;


#ThriftHiveMetastore.class
public static class Processor extends com.facebook.fb303.FacebookService.Processor implements TProcessor
private static final Logger LOGGER &#61; LoggerFactory.getLogger(ThriftHiveMetastore.Processor.class.getName());
public Processor(I iface)
super(iface, getProcessMap(new HashMap()));

//Note:processMap
protected Processor(I iface, Map> processMap)
super(iface, getProcessMap(processMap));

...

虽然可以传入processMap&#xff0c;但是该构造方法是protected&#xff0c;不允许直接创建对象 &#xff0c;只能通过继承来扩展&#xff0c;比如这样&#xff1a;


public static class MetastoreProcessor extends ThriftHiveMetastore.Processor
private static final Logger LOGGER &#61; LoggerFactory.getLogger(MetastoreProcess.class.getName());
public MetastoreProcess(I iface)
super(iface, getProcessMap(new HashMap>()));

protected MetastoreProcess(I iface, Map> processMap)
super(iface, getProcessMap(processMap));

private static Map> getProcessMap(Map> processMap)
processMap.put("add_index", new add_index());
processMap.put("alter_index", new alter_index());
processMap.put("drop_index_by_name", new drop_index_by_name());
processMap.put("get_index_by_name", new get_index_by_name());
processMap.put("get_indexes", new get_indexes());
processMap.put("get_index_names", new get_index_names());
return processMap;


然后修改TProcessor实例&#xff0c;重建服务&#xff1a;


HMSHandler baseHandler &#61; new HiveMetaStore.HMSHandler("metaserver", conf,false);
IHMSHandler handler &#61; newRetryingHMSHandler(baseHandler, conf);
TProcessor processor &#61; new MetastoreProcessor<>(handler));
final TThreadPoolServer.Args serverArgs &#61; new TThreadPoolServer
.Args(serverTransport)
.transportFactory(new TTransportFactory())
.protocolFactory(new TBinaryProtocol.Factory())
.processor(processor);

推荐阅读
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 本文介绍了九度OnlineJudge中的1002题目“Grading”的解决方法。该题目要求设计一个公平的评分过程,将每个考题分配给3个独立的专家,如果他们的评分不一致,则需要请一位裁判做出最终决定。文章详细描述了评分规则,并给出了解决该问题的程序。 ... [详细]
  • 本文介绍了使用PHP实现断点续传乱序合并文件的方法和源码。由于网络原因,文件需要分割成多个部分发送,因此无法按顺序接收。文章中提供了merge2.php的源码,通过使用shuffle函数打乱文件读取顺序,实现了乱序合并文件的功能。同时,还介绍了filesize、glob、unlink、fopen等相关函数的使用。阅读本文可以了解如何使用PHP实现断点续传乱序合并文件的具体步骤。 ... [详细]
  • 本文讨论了在Spring 3.1中,数据源未能自动连接到@Configuration类的错误原因,并提供了解决方法。作者发现了错误的原因,并在代码中手动定义了PersistenceAnnotationBeanPostProcessor。作者删除了该定义后,问题得到解决。此外,作者还指出了默认的PersistenceAnnotationBeanPostProcessor的注册方式,并提供了自定义该bean定义的方法。 ... [详细]
  • HDFS2.x新特性
    一、集群间数据拷贝scp实现两个远程主机之间的文件复制scp-rhello.txtroothadoop103:useratguiguhello.txt推pushscp-rr ... [详细]
  • 前景:当UI一个查询条件为多项选择,或录入多个条件的时候,比如查询所有名称里面包含以下动态条件,需要模糊查询里面每一项时比如是这样一个数组条件:newstring[]{兴业银行, ... [详细]
  • 本文介绍了iOS数据库Sqlite的SQL语句分类和常见约束关键字。SQL语句分为DDL、DML和DQL三种类型,其中DDL语句用于定义、删除和修改数据表,关键字包括create、drop和alter。常见约束关键字包括if not exists、if exists、primary key、autoincrement、not null和default。此外,还介绍了常见的数据库数据类型,包括integer、text和real。 ... [详细]
  • Ihavethefollowingonhtml我在html上有以下内容<html><head><scriptsrc..3003_Tes ... [详细]
  • 本文介绍了如何使用Express App提供静态文件,同时提到了一些不需要使用的文件,如package.json和/.ssh/known_hosts,并解释了为什么app.get('*')无法捕获所有请求以及为什么app.use(express.static(__dirname))可能会提供不需要的文件。 ... [详细]
  • Spring学习(4):Spring管理对象之间的关联关系
    本文是关于Spring学习的第四篇文章,讲述了Spring框架中管理对象之间的关联关系。文章介绍了MessageService类和MessagePrinter类的实现,并解释了它们之间的关联关系。通过学习本文,读者可以了解Spring框架中对象之间的关联关系的概念和实现方式。 ... [详细]
  • PDO MySQL
    PDOMySQL如果文章有成千上万篇,该怎样保存?数据保存有多种方式,比如单机文件、单机数据库(SQLite)、网络数据库(MySQL、MariaDB)等等。根据项目来选择,做We ... [详细]
  • ZSI.generate.Wsdl2PythonError: unsupported local simpleType restriction ... [详细]
author-avatar
布瓜Pourqu2502854853
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有